Skip to content

Automated Test: span-flusher-multiprocess #315

Closed

Conversation

admin-coderabbit
Copy link
Owner

@admin-coderabbit admin-coderabbit commented Feb 4, 2026

This pull request was automatically created by @coderabbitai/e2e-reviewer.

Batch created pull request.

Summary by CodeRabbit

  • New Features

    • Added --flusher-processes CLI option to configure the maximum number of parallel processes for span flushing (defaults to 1).
    • Enhanced span flusher to support multi-process parallelism for improved throughput and scalability.
  • Documentation

    • Improved anti-patterns documentation with clearer examples for type-checking best practices.

@coderabbit-eval
Copy link

coderabbit-eval bot commented Feb 4, 2026

📝 Walkthrough

Walkthrough

The PR introduces multi-process parallelism to the span flusher, allowing configurable distribution of shard processing across multiple processes. Configuration is added via CLI options, factory parameter propagation, and comprehensive flusher refactoring to manage per-process buffers, health tracking, and backpressure handling. Tests updated accordingly.

Changes

Cohort / File(s) Summary
Documentation & Configuration
CLAUDE.md, src/sentry/consumers/__init__.py
Added documentation example contrasting hasattr vs isinstance patterns for unions. Introduced new --flusher-processes CLI option (default 1) in the process-spans consumer definition to control the number of parallel flushing processes.
Factory Layer
src/sentry/spans/consumers/process/factory.py
Added optional flusher_processes parameter to ProcessSpansStrategyFactory.__init__ and propagated it to SpanFlusher instantiation via max_processes argument.
Core Flusher Implementation
src/sentry/spans/consumers/process/flusher.py
Refactored from single-process to multi-process architecture. Added per-shard process mapping, per-process buffers, health tracking (process_healthy_since), backpressure tracking (process_backpressure_since), and process lifecycle management. Updated main entry point to accept shard list and tag metrics per-shard. Introduced methods for process creation, health checks, and dynamic restarts.
Test Updates
tests/sentry/spans/consumers/process/test_consumer.py, tests/sentry/spans/consumers/process/test_flusher.py
Updated test_basic decorator to use django_db(transaction=True). Added new test test_flusher_processes_limit to verify shard-to-process distribution. Updated backpressure assertion to check any value across multiple per-process backpressure timestamps.

Sequence Diagram

sequenceDiagram
    participant CLI as CLI Argument Parser
    participant Factory as ProcessSpansStrategyFactory
    participant Flusher as SpanFlusher
    participant ProcessMgr as Process Manager
    participant Worker as Worker Process
    participant Buffer as SpansBuffer

    CLI->>Factory: Initialize with max_processes=N
    Factory->>Flusher: Create instance with flusher_processes=N
    Flusher->>ProcessMgr: Create process-to-shards mapping
    Flusher->>ProcessMgr: Initialize per-process structures<br/>(buffers, health, backpressure)
    
    loop For each shard group
        ProcessMgr->>Worker: Spawn worker process<br/>with assigned shards
        Worker->>Buffer: Create dedicated SpansBuffer
    end
    
    Note over Flusher,Worker: Message Processing Loop
    Flusher->>Flusher: Receive message
    Flusher->>Flusher: Check backpressure<br/>across all processes
    Flusher->>Buffer: Route to correct<br/>process buffer
    Flusher->>Worker: Signal buffer availability
    Worker->>Buffer: Process and flush segments
    Worker->>Flusher: Update per-process<br/>health_since/backpressure_since
    
    Note over Flusher: Health & Lifecycle
    Flusher->>Worker: Periodic health check
    alt Process unhealthy
        Flusher->>Worker: Terminate
        Flusher->>ProcessMgr: Restart for shard group
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

🐰 Hark! The span flusher leaps and bounds,
With processes dancing all around,
Each shard gets care, each buffer gleams,
Health checks run through distributed dreams,
Multi-process magic, shards so light! 🌟

🚥 Pre-merge checks | ✅ 1 | ❌ 1
❌ Failed checks (1 inconclusive)
Check name Status Explanation Resolution
Title check ❓ Inconclusive The title 'Automated Test: span-flusher-multiprocess' is vague and generic, using 'Automated Test' which doesn't clearly convey what this PR actually implements beyond testing. Revise the title to more clearly describe the main change, such as 'Add multiprocess support to span flusher' or 'Enable parallel span flushing across multiple processes'.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch span-flusher-multiprocess

Comment @coderabbit eval help to get the list of available commands and usage tips.

@coderabbit-eval coderabbit-eval bot closed this Feb 4, 2026
Copy link

@coderabbit-eval coderabbit-eval bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@src/sentry/consumers/__init__.py`:
- Around line 430-438: The --flusher-processes click.Option currently allows 0
or negative values which lead to silent misconfiguration; update the Option
definition for "flusher_processes" (the click.Option in the "click_options"
list) to enforce a minimum of 1—either by using click.IntRange(min=1) as the
type or adding min=1/type validation or a click callback that raises a
click.BadParameter when value < 1; ensure this validation is applied alongside
multiprocessing_options so the flusher code that reads flusher_processes (the
path using max_processes or len(...)) always receives a positive integer.

In `@src/sentry/spans/consumers/process/flusher.py`:
- Line 199: The metric invocation using metrics.timer for
"spans.buffer.flusher.wait_produce" has an inconsistent tag key ("shards")
compared to other calls that use "shard"; update the tags argument in that
metrics.timer call to use tags={"shard": shard_tag} so the key matches the other
uses (see metrics.timer, shard_tag, and the metric name
"spans.buffer.flusher.wait_produce") to keep dashboard aggregation consistent.
🧹 Nitpick comments (2)
src/sentry/spans/consumers/process/flusher.py (2)

336-347: Unused loop variable process_index.

Per the static analysis hint, process_index is not used within the loop body. Use _ or iterate over .values() directly.

Proposed fix
-        for process_index, process in self.processes.items():
+        for process in self.processes.values():

127-132: Remove unused _create_process_for_shard method.

This method is never called anywhere in the codebase. A search across the repository shows only the method definition exists with no callers. The similar method _create_process_for_shards (plural) is the one actually in use. Since there's no TODO comment or indication of future use, this appears to be dead code from an earlier API design and should be removed.

Comment on lines +430 to +438
"click_options": [
*multiprocessing_options(default_max_batch_size=100),
click.Option(
["--flusher-processes", "flusher_processes"],
default=1,
type=int,
help="Maximum number of processes for the span flusher. Defaults to 1.",
),
],

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Consider adding a minimum-value constraint for --flusher-processes.

A value of 0 is silently treated as "use default" (due to max_processes or len(...) in the flusher), and a negative value would result in no processes being created at all — a silent failure with no error message. Adding a min=1 constraint or a click callback to validate would prevent misconfiguration.

Proposed fix
             click.Option(
                 ["--flusher-processes", "flusher_processes"],
                 default=1,
                 type=int,
+                callback=lambda ctx, param, value: value if value >= 1 else (_ for _ in ()).throw(click.BadParameter("--flusher-processes must be >= 1")),
                 help="Maximum number of processes for the span flusher. Defaults to 1.",
             ),

Or more cleanly with click.IntRange:

             click.Option(
                 ["--flusher-processes", "flusher_processes"],
                 default=1,
-                type=int,
+                type=click.IntRange(min=1),
                 help="Maximum number of processes for the span flusher. Defaults to 1.",
             ),
🤖 Prompt for AI Agents
In `@src/sentry/consumers/__init__.py` around lines 430 - 438, The
--flusher-processes click.Option currently allows 0 or negative values which
lead to silent misconfiguration; update the Option definition for
"flusher_processes" (the click.Option in the "click_options" list) to enforce a
minimum of 1—either by using click.IntRange(min=1) as the type or adding
min=1/type validation or a click callback that raises a click.BadParameter when
value < 1; ensure this validation is applied alongside multiprocessing_options
so the flusher code that reads flusher_processes (the path using max_processes
or len(...)) always receives a positive integer.

produce(kafka_payload)

with metrics.timer("spans.buffer.flusher.wait_produce"):
with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Inconsistent metrics tag name: "shards" (plural) vs "shard" (singular) elsewhere.

Lines 185 and 192–196 use tags={"shard": shard_tag}, but this line uses tags={"shards": shard_tag}. This will cause the metric spans.buffer.flusher.wait_produce to be tracked under a different tag key, breaking dashboard queries and aggregation.

Proposed fix
-                with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}):
+                with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shard": shard_tag}):
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}):
with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shard": shard_tag}):
🤖 Prompt for AI Agents
In `@src/sentry/spans/consumers/process/flusher.py` at line 199, The metric
invocation using metrics.timer for "spans.buffer.flusher.wait_produce" has an
inconsistent tag key ("shards") compared to other calls that use "shard"; update
the tags argument in that metrics.timer call to use tags={"shard": shard_tag} so
the key matches the other uses (see metrics.timer, shard_tag, and the metric
name "spans.buffer.flusher.wait_produce") to keep dashboard aggregation
consistent.

Sign in to join this conversation on GitHub.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant